传统ELK架构的日志收集:
存在的问题:Logstash耗资源较大,运行占用CPU和内存高。另外没有消息队列缓存,存在数据丢失隐患。适用于小规模的集群使用。
第二种架构:
位于各个节点上的Log Agent先将数据/日志传递给Kafka,并将队列中消息或数据交由Log Transfer,传递给Elasticsearch存储。最后由Kibana将日志和数据呈现给用户。因为引入了Kafka,数据会先被存储下来,所以即使Logstash server因故障停止运行,数据也不会丢失。这种架构适合于较大集群使用
各组件介绍:
LogAgent:日志收集客户端,用来收集服务器上的日志
Kafka:高吞吐量的分布式队列(Linkin开发,apache顶级开源项目),消息队列和日志存储。
ElasticSearch:开源的搜索引擎,提供介于HTTP RESTful的web接口
Kibana:开源的ES数据分析和可视化工具。
Hadoop:分布式计算框架,能够对大量数据进行分布式处理的平台。
Storm:一个免费并开源的分布式实时计算系统
引用链接
Kafka和tailf的参考链接
Zookeeper:ZooKeeper是一种分布式协调服务,用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。ZooKeeper通过其简单的架构和API解决了这个问题。
Zookeeper扮演红色角色
ElasticSearch:是一个分布式可扩展的实时搜索和分析引擎,一个建立在全文搜索引擎 Apache Lucene™ 基础上的搜索引擎.当然 Elasticsearch 并不仅仅是 Lucene 那么简单,它不仅包括了全文搜索功能,还可以进行以下工作:
分布式实时文件存储,并将每一个字段都编入索引,使其可以被搜索。
实时分析的分布式搜索引擎。
可以扩展到上百台服务器,处理PB级别的结构化或非结构化数据。
Kibana是一个开源的分析与可视化平台,设计出来用于和Elasticsearch一起使用的。你可以用kibana搜索、查看存放在Elasticsearch中的数据。Kibana与Elasticsearch的交互方式是各种不同的图表、表格、地图等,直观的展示数据,从而达到高级的数据分析与可视化的目的。
Elasticsearch、Logstash和Kibana这三个技术就是我们常说的ELK技术栈,可以说这三个技术的组合是大数据领域中一个很巧妙的设计。一种很典型的MVC思想,模型持久层,视图层和控制层。Logstash担任控制层的角色,负责搜集和过滤数据。Elasticsearch担任数据持久层的角色,负责储存数据。而我们这章的主题Kibana担任视图层角色,拥有各种维度的查询和分析,并使用图形化的界面展示存放在Elasticsearch中的数据。
etcd 是一个分布式键值对存储系统,由coreos 开发,内部采用 raft 协议作为一致性算法,用于可靠、快速地保存关键数据,并提供访问。通过分布式锁、leader选举和写屏障(write barriers),来实现可靠的分布式协作。etcd集群是为高可用、持久化数据存储和检索而准备。
etcd架构图:
源码:
logagent包
config.ini
[kafka]
address=127.0.0.1:9092
chan_max_size=100000
[etcd]
address=127.0.0.1:2379
timeout=5
collect_log_key=/log/%s/collect_config
[taillog]
filename=./my.log
timeout=5
1
2
3
4
5
6
7
8
9
10
11
12
config.go
package conf
type AppConf struct {
KafkaConf `ini:"kafka"`
EtcdConf `ini:"etcd"`
}
type KafkaConf struct {
Address string `ini:"address"`
ChanMaxSize int `ini:"chan_max_size"`
}
type EtcdConf struct {
Address string `ini:"address"`
Key string `ini:"collect_log_key"`
Timeout int `ini:"timeout"`
}
type TaillogConf struct {
Filename string `ini:"filename"`
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
etcd.go
package etcd
import (
"context"
"encoding/json"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
var (
cli *clientv3.Client
)
//需要收集的日志的配置信息
type LogEntry struct {
Path string `json:"path"`
Topic string `json:"topic"`
}
//初始化etcd的函数
func Init(addr string, timeout time.Duration) {
var err error
cli, err = clientv3.New(clientv3.Config{
Endpoints: []string{addr},
DialTimeout: timeout,
})
if err != nil {
fmt.Println("connect to etcd success")
return
}
fmt.Println("connect to etcd success")
}
//从etcd中获取根据key配置项
func GetConf(key string) (LogEntryConf []*LogEntry, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, key)
cancel()
if err != nil {
fmt.Println("put to etcd failed,err:", err)
return
}
for _, ev := range resp.Kvs {
err = json.Unmarshal(ev.Value, &LogEntryConf)
if err != nil {
fmt.Println("unmarshal etcd value failed,err:", err)
return
}
fmt.Printf("value:%s\n", ev.Value)
}
return
}
//etcd_watch
func WatchConf(key string, newConfCh chan<- []*LogEntry) {
ch := cli.Watch(context.Background(), key)
//从通道尝试取值(监视的信息)
for wresp := range ch {
for _, evt := range wresp.Events {
fmt.Printf("Type:%v key:%v value:%v\n", evt.Type, string(evt.Kv.Key), string(evt.Kv.Value))
//通知taillog.tskMgr
//1.先判断操作的类型
var newConf []*LogEntry
// if evt.Type != clientv3.EventTypeDelete{
// //如果是删除操作,手动传递一个空的配置项
// err := json.Unmarshal(evt.Kv.Value, &newConf)
// if err != nil {
// fmt.Println("unmarshal failed,err:", err)
// continue
// }
err := json.Unmarshal(evt.Kv.Value, &newConf)
if err != nil {
fmt.Println("unmarshal failed,err:", err)
continue
}
fmt.Println("get new conf:", newConf)
newConfCh <- newConf
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
etcd_put.go
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"}, //节点
DialTimeout: 5 * time.Second, //超过5秒钟连不上超时
})
if err != nil {
fmt.Println("connect to etcd failed:", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
value := `[{"path":"d:/tmp/nginx.log","topic":"web_log"},{"path":"d:/xxx/redis.log","topic":"redis_log"},{"path":"d:/xxx/mysql.log","topic":"mysql_log"}]`
_, err = cli.Put(ctx, "/log/192.168.1.7/collect_config", value)
cancel()
if err != nil {
fmt.Println("put to etcd failed,err:", err)
return
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
kafka.go
package kafka
//log Agent入口
import (
"fmt"
"time"
"github.com/Shopify/sarama"
)
type logData struct {
topic string
data string
}
var (
client sarama.SyncProducer //声明一个全局的连接kafka的生产者client
logDataChan chan *logData
)
//初始化client
func Init(addrs []string, maxSize int) (err error) {
config := sarama.NewConfig()
//tailf包使用
config.Producer.RequiredAcks = sarama.WaitForAll //发送完数据需要leader和follow都确认
config.Producer.PartitiOner= sarama.NewRandomPartitioner //新选出一个partition
config.Producer.Return.Successes = true //成功交付的消息将在success channel返回
//连接kafka
client, err = sarama.NewSyncProducer(addrs, config)
if err != nil {
fmt.Println("producer closed,err:", err)
return
}
fmt.Println("连接kafka成功!")
if err != nil {
fmt.Println("send msg failed,err:", err)
return
}
//初始化logDataChan
logDataChan = make(chan *logData, maxSize)
//开启后台的goroutine从通道中取数据发往kafka
go SendToKarfka()
return
}
//给外部暴露的一个函数,该函数只把日志数据发送到一个内部的channel中
func SendToChan(topic, data string) {
msg := &logData{
topic: topic,
data: data,
}
logDataChan <- msg
}
//真正往kafka发送日志的函数
func SendToKarfka() {
for {
select {
case ld := <-logDataChan:
//构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = ld.topic
msg.Value = sarama.StringEncoder(ld.data)
//发送到kafka
pid, offset, err := client.SendMessage(msg) //offset是写成功的文件的索引位置
if err != nil {
fmt.Println("send msg failed,err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
default:
time.Sleep(time.Millisecond * 50)
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
tail.go
package taillog
import (
"context"
"fmt"
"test/log/kafka"
"github.com/hpcloud/tail"
)
var (
tailObj *tail.Tail
LogChan chan string
)
//TailTask:一个日志收集的任务
type TailTask struct {
path string
topic string
instance *tail.Tail
//为了能够实现退出t.run()
ctx context.Context
cancelFunc context.CancelFunc
}
func NewTailTask(path, topic string) (tailObj *TailTask) {
ctx, cancel := context.WithCancel(context.Background())
tailObj = &TailTask{
path: path,
topic: topic,
ctx: ctx,
cancelFunc: cancel,
}
tailObj.init() //根据路径去打开对应的日志
return
}
func (t *TailTask) init() {
config := tail.Config{
ReOpen: true, //重新打开
Follow: true, //是否跟随
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件哪个地方开始读
MustExist: false, //文件不存在不报错
Poll: true,
}
var err error
t.instance, err = tail.TailFile(t.path, config)
if err != nil {
fmt.Println("tail file failed,err:", err)
}
go t.run() //直接去采集日志发送到kafka
}
func (t *TailTask) run() {
for {
select {
case <-t.ctx.Done():
fmt.Printf("tail tast:%v_%s finish...\n", t.path, t.topic)
return
case line := <-t.instance.Lines: //从tailObj的通道中一行一行的读取日志数据
// kafka.SendToKarfka(t.topic, line.Text) //函数调函数
//先把日志数据发到一个通道中
kafka.SendToChan(t.topic, line.Text)
//kafka那个包中有单独的goroutine去取日志数据发到kafka
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
tail_mgr.go
package taillog
import (
"fmt"
"test/log/etcd"
"time"
)
var tskMgr *taillogMgr
//tailTask 管理者
type taillogMgr struct {
logEntry []*etcd.LogEntry
tskMap map[string]*TailTask
newConfChan chan []*etcd.LogEntry
}
func Init(logEntryConf []*etcd.LogEntry) {
tskMgr = &taillogMgr{
logEntry: logEntryConf, //把当前的日志收集配置信息保存起来
tskMap: make(map[string]*TailTask, 16),
newConfChan: make(chan []*etcd.LogEntry), //无缓冲区的通道
}
for _, LogEntry := range logEntryConf {
//conf:*etcd.LogEntry
//logEntry.Path:要收集的日志文件的路径
//初始化的时候起了多少个tailtask都要记下来,为了后续判断方便
tailObj := NewTailTask(LogEntry.Path, LogEntry.Topic)
mk := fmt.Sprintf("%s_%s", LogEntry.Path, LogEntry.Topic)
tskMgr.tskMap[mk] = tailObj
}
go tskMgr.run()
}
//监听自己的newConfChan,有了新的配置过来之后就做对应的处理
func (t *taillogMgr) run() {
for {
select {
case newConf := <-t.newConfChan:
fmt.Println("新的配置来了!", newConf)
for _, conf := range newConf {
mk := fmt.Sprintf("%s_%s", conf.Path, conf.Topic)
_, ok := t.tskMap[mk]
if ok {
//原来就有,不需要操作
continue
} else {
//新增的
tailObj := NewTailTask(conf.Path, conf.Topic)
t.tskMap[mk] = tailObj
}
}
//找出原来t.tskMap有,但是newConf中没有的,要删除
for _, c1 := range t.logEntry { //从原来的配置中依次拿出配置项
isDelete := true
for _, c2 := range newConf { //去新的配置中逐一进行比较
if c2.Path == c1.Path && c2.Topic == c1.Topic {
isDelete = false
continue
}
}
if isDelete {
//把c1对应的tailObj给停掉
mk := fmt.Sprintf("%s_%s", c1.Path, c1.Topic)
t.tskMap[mk].cancelFunc()
}
}
//1.配置新增
//2.配置删除
//3.配置变更
default:
time.Sleep(time.Second)
}
}
}
//一个函数,向外暴露tskMgr的newConfChan
func NewConfChan() chan<- []*etcd.LogEntry {
return tskMgr.newConfChan
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
ip.go
package utils
import (
"net"
"strings"
)
//GetOutboundIP 获取本地对外IP
func GetOutboundIP() (ip string, err error) {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
return
}
defer conn.Close()
localAddr := conn.LocalAddr().(*net.UDPAddr)
ip = strings.Split(localAddr.IP.String(), ":")[0]
return
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
main.go
package main
import (
"fmt"
"sync"
"test/log/conf"
"test/log/etcd"
"test/log/kafka"
"test/log/taillog"
"test/log/utils"
"time"
"gopkg.in/ini.v1"
)
var (
cfg = new(conf.AppConf)
)
func main() {
//0.加载配置文件
err := ini.MapTo(cfg, "./conf/config.ini")
if err != nil {
println("load ini failed,err:", err)
return
}
//1.初始化一个kafka连接
err = kafka.Init([]string{cfg.KafkaConf.Address}, cfg.KafkaConf.ChanMaxSize)
if err != nil {
fmt.Println("init kafka failed,err:", err)
return
}
fmt.Println("初始化成功!")
//2.初始化etcd
etcd.Init(cfg.EtcdConf.Address, time.Duration(cfg.EtcdConf.Timeout)*time.Second)
if err != nil {
fmt.Println("init etcd failed,err:", err)
return
}
//为了实现每个logagent都拉取自己独有的配置,所以要以自己的IP地址作为区分
ipStr, err := utils.GetOutboundIP()
if err != nil {
panic(err)
}
etcdConfKey := fmt.Sprintf(cfg.EtcdConf.Key, ipStr)
fmt.Printf("etcdConfKey:%s\n", etcdConfKey)
//2.1 从etcd中获取日志收集项的配置信息
logEntryConf, err := etcd.GetConf(etcdConfKey)
if err != nil {
fmt.Println("etcd.GetConf failed,err:", err)
return
}
fmt.Println("get conf from etcd success:", logEntryConf)
//2.2 派一个哨兵去监视日志收集项的变化(有变化及时通知我的logAgent实现加载配置)
for index, value := range logEntryConf {
fmt.Printf("index:%v value:%v\n", index, value)
}
fmt.Println("init etcd success.")
//3.收集日志发往Kafka
//3.1 循环每一个日志收集项,创建一个TailObj
taillog.Init(logEntryConf)
//因为NewConfChan访问了tskMgr的newConfChan,这个channel是在taillog.Init(logEntryConf)执行的初始化
newConfChan := taillog.NewConfChan() //从taillog包中获取对外暴露的通道
var wg sync.WaitGroup
wg.Add(1)
go etcd.WatchConf(etcdConfKey, newConfChan) //哨兵发现最新的配置信息会通知上面的那个通道
wg.Wait()
//3.2发往Kafka
//4.打开日志文件准备收集日志
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
log_transfer包
cfg.ini
[kafka]
address=127.0.0.1:9092
topic=web_log
[es]
address=127.0.0.1:9200
size=100000
1
2
3
4
5
6
7
8
cfg.go
package conf
//LogTransfer 全局配置
type Logtransfer struct {
KafkaCfg `ini:"kafka"`
ESCfg `ini:"es"`
}
//Kafka...
type KafkaCfg struct {
Address string `ini:"address"`
Topic string `ini:"topic"`
}
//ESCfg
type ESCfg struct {
Address string `ini:"address"`
ChanSize int `ini:"size"`
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
es.go
package es
import (
"context"
"fmt"
"strings"
"time"
"github.com/olivere/elastic/v7"
)
//初始化ES,准备接收kafka那边发来的数据
type LogData struct {
Topic string `json:"topic"`
Data string `json:"data"`
}
var (
client *elastic.Client
ch chan *LogData
)
//init...
func Init(address string, chanSize int) (err error) {
if !strings.HasPrefix(address, "http://") {
address = "http://" + address
}
client, err = elastic.NewClient(elastic.SetURL(address))
if err != nil {
return
}
fmt.Println("connect to es success")
ch = make(chan *LogData, chanSize)
go SendToES()
return
}
// func SendToESChan(d *LogData) (err error) {
// msg := &LogData{}
// msg.Topic = d.Topic
// msg.Data = string(d.Data)
// _, err = client.Index().
// Index(d.Topic).
// BodyJson(msg).
// Do(context.Background())
// if err != nil {
// panic(err)
// }
// return
// }
func SendToESChan(msg *LogData) {
ch <- msg
}
//发送数据到ES
func SendToES() {
//链式操作
for {
select {
case msg := <-ch:
put1, err := client.Index().
Index(msg.Topic). //Index表数据库
Type("xxx").
BodyJson(msg). //把一个go语言的对象转换为json格式
Do(context.Background())
if err != nil {
fmt.Println(err)
}
fmt.Printf("Indexed %s to index %s,type %s\n", put1.Id, put1.Index, put1.Type)
default:
time.Sleep(time.Second)
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
kafka.go
package kafka
import (
"fmt"
"test/log_transfer/es"
"github.com/Shopify/sarama"
)
//LogData...
type LogData struct {
Data string `json:"data"`
}
//初始化kafka消费者,从kafka取数据发往ES
func Init(addr []string, topic string) (err error) {
consumer, err := sarama.NewConsumer(addr, nil)
if err != nil {
fmt.Printf("fail to start consumer,err:%v\n", err)
return
}
partitionList, err := consumer.Partitions(topic) //根据topic取到所有的分区
if err != nil {
fmt.Println("fail to get list of partition:", err)
return
}
var pc sarama.PartitionConsumer
fmt.Println(partitionList)
for partition := range partitionList {
pc, err = consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v", partition, err)
return
}
defer pc.AsyncClose()
//异步从每个分区消费消息
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
//直接发给ES
var ld = es.LogData{
Topic: topic,
Data: string(msg.Value),
}
es.SendToESChan(&ld) //函数调函数
//优化一下,直接放到chann中
}
}(pc)
select {}
}
return
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
main.go
package main
import (
"fmt"
"test/log_transfer/conf"
"test/log_transfer/es"
"test/log_transfer/kafka"
"gopkg.in/ini.v1"
)
//log transfer
//将日志数据从kafka取出来发往ES
func main() {
//0.加载配置文件
var cfg conf.Logtransfer
err := ini.MapTo(&cfg, "./conf/cfg.ini")
if err != nil {
fmt.Println("init config err:", err)
return
}
fmt.Printf("cfg:%v\n", cfg)
//1.初始化ES
//1.1 初始化一个ES连接的client
//1.2 对外提供y一个往ES写入数据的一个函数
err = es.Init(cfg.ESCfg.Address, cfg.ESCfg.ChanSize)
if err != nil {
fmt.Println("init ES consumer failed,err:", err)
return
}
fmt.Println("init es success.")
//2.初始化Kafka
//2.1 连接kafka,创建分区的消费者
//2.2 每个分区的消费者分别取出数据,通过sendToES()将数据发往ES
err = kafka.Init([]string{cfg.KafkaCfg.Address}, cfg.KafkaCfg.Topic)
if err != nil {
fmt.Println("init kafka consumer failed,err:", err)
return
}
//3.从kafka中取数据
//4.发往ES
select {}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
自己编写版):
config包:
config.ini
[kafka]
address=127.0.0.1:9092
chan_max_size=100000
[etcd]
address=127.0.0.1:2379
timeout=5
log_key=/log/collect_config
[es]
address=127.0.0.1:9200
size=100000
1
2
3
4
5
6
7
8
9
10
11
12
config.go
package config
import (
"context"
"github.com/hpcloud/tail"
)
type AppConf struct {
KafkaConf `ini:"kafka"`
EtcdConf `ini:"etcd"`
ESConf `ini:"es"`
}
type KafkaConf struct {
Address string `ini:"address"`
Max_size int `ini:"chan_max_size"`
}
type EtcdConf struct {
Address string `ini:"address"`
Timeout int `ini:"timeout"`
Log_key string `ini:"log_key"`
}
type ESConf struct {
Address string `ini:"address"`
Max_size int `ini:"size"`
}
type LogConf struct {
Path string `ini:"path"`
Topic string `ini:"topic"`
}
type LogEntryConf []*LogConf
type TailTask struct {
Path string
Topic string
Instance *tail.Tail
Ctx context.Context
CancelF context.CancelFunc
}
type LogData struct {
Topic string
Data string
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
es包
es.go
package es
import (
"context"
"fmt"
"test/mylog/config"
"time"
"github.com/olivere/elastic/v7"
)
var (
client *elastic.Client
ESchan chan *config.LogData
)
func Init(address string, size int) (err error) {
client, err = elastic.NewClient(elastic.SetURL(address))
if err != nil {
fmt.Println("Init ES failed,err:", err)
return
}
ESchan = make(chan *config.LogData, 1000)
go SendToES()
return
}
func SendToESChan(msg *config.LogData) {
ESchan <- msg
fmt.Println("sssss")
}
func SendToES() {
for {
select {
case msg := <-ESchan:
put1, err := client.Index().Index(msg.Topic).BodyJson(msg).Do(context.Background())
if err != nil {
panic(err)
}
fmt.Printf("Index user:%s to index %s,type:%s\n", put1.Id, put1.Index, put1.Type)
default:
time.Sleep(time.Second)
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
etcd包
etcd.go
package etcd
import (
"context"
"encoding/json"
"fmt"
"time"
"test/mylog/config"
"test/mylog/tail"
"go.etcd.io/etcd/clientv3"
)
var (
client *clientv3.Client
logdata config.LogEntryConf
)
func Init(address []string, timeout int) (err error) {
client, err = clientv3.New(clientv3.Config{
Endpoints: address,
DialTimeout: time.Duration(timeout) * time.Second,
})
if err != nil {
fmt.Println("connect to etcd failed,err:\n", err)
return
}
fmt.Println("connect to etcd success!")
return
}
func GetConf(key string) (logconf config.LogEntryConf, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
var resp *clientv3.GetResponse
resp, err = client.Get(ctx, key)
cancel()
if err != nil {
fmt.Println("get from etcd failed,err:", err)
return
}
for _, ev := range resp.Kvs {
err = json.Unmarshal(ev.Value, &logconf)
if err != nil {
fmt.Println("Unmarshal json failed:", err)
return
}
}
return
}
func WatchConf(topic string) {
rch := client.Watch(context.Background(), topic)
channel := tail.Get_chan()
for wresp := range rch {
for _, ev := range wresp.Events {
err := json.Unmarshal(ev.Kv.Value, &logdata)
if err != nil {
fmt.Println("Update conf failed,err:", err)
return
}
fmt.Println("update config success:", ev.Kv.Value)
channel <- logdata
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
kafka包
kafka.go
package kafka
import (
"fmt"
"test/mylog/config"
"test/mylog/es"
"github.com/Shopify/sarama"
)
var (
client sarama.SyncProducer
logDataChan chan *config.LogData
consumer sarama.Consumer
pc sarama.PartitionConsumer
)
func Init(address []string, max_size int) (err error) {
cfg := sarama.NewConfig()
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Producer.PartitiOner= sarama.NewRandomPartitioner
cfg.Producer.Return.Successes = true
client, err = sarama.NewSyncProducer(address, cfg)
if err != nil {
fmt.Println("Produce error:", err)
return
}
logDataChan = make(chan *config.LogData, max_size)
consumer, err = sarama.NewConsumer(address, nil)
if err != nil {
fmt.Println("Init consumer failed,err:", err)
return
}
go SendMessage()
return
}
func SendToChan(topic, data string) {
var t = &config.LogData{
Topic: topic,
Data: data,
}
logDataChan <- t
}
func SendMessage() {
for {
select {
case ld := <-logDataChan:
msg := sarama.ProducerMessage{}
msg.Topic = ld.Topic
msg.Value = sarama.StringEncoder(ld.Data)
pid, offset, err := client.SendMessage(&msg)
if err != nil {
fmt.Println("Send Message error:", err)
}
fmt.Printf("pid:%v offser:%v Topic:%v Value:%v\n", pid, offset, ld.Topic, ld.Data)
default:
}
}
}
func Consumer(topic string) {
partitionList, err := consumer.Partitions(topic)
if err != nil {
fmt.Println("Get partitions failed,err:", err)
return
}
for partition := range partitionList {
pc, err = consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Println("failed to start consumer for partition,err:", err)
return
}
}
defer pc.AsyncClose()
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
t := &config.LogData{
Topic: topic,
Data: string(msg.Value),
}
es.SendToESChan(t)
}
}(pc)
select {}
}
// func Consumer(topic string) (err error) {
// partitionList, err := consumer.Partitions(topic) //根据topic取到所有的分区
// if err != nil {
// fmt.Println("fail to get list of partition:", err)
// return
// }
// var pc sarama.PartitionConsumer
// fmt.Println(partitionList)
// for partition := range partitionList {
// pc, err = consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
// if err != nil {
// fmt.Printf("failed to start consumer for partition %d,err:%v", partition, err)
// return
// }
// defer pc.AsyncClose()
// //异步从每个分区消费消息
// go func(sarama.PartitionConsumer) {
// for msg := range pc.Messages() {
// fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
// //直接发给ES
// var ld = config.LogData{
// Topic: topic,
// Data: string(msg.Value),
// }
// es.SendToESChan(&ld) //函数调函数
// //优化一下,直接放到chann中
// }
// }(pc)
// select {}
// }
// return
// }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
tail包
tail.go
package tail
import (
"context"
"fmt"
"test/mylog/config"
"test/mylog/kafka"
"github.com/hpcloud/tail"
)
type Tasks config.TailTask
var (
tails *tail.Tail
tasks_map map[string]*config.TailTask
tasks_chan chan config.LogEntryConf
)
func run(T *config.TailTask) {
for {
select {
case <-T.Ctx.Done():
return
case line := <-T.Instance.Lines:
kafka.SendToChan(T.Topic, line.Text)
}
}
}
func Init(Tvalue config.LogEntryConf) error {
tasks_map = make(map[string]*config.TailTask, 100)
tasks_chan = make(chan config.LogEntryConf)
for _, value := range Tvalue {
base := config.LogConf{
Path: value.Path,
Topic: value.Topic,
}
Task, err := NewTask(base)
name := fmt.Sprintf("%s\\%s", value.Path, value.Topic)
tasks_map[name] = Task
if err != nil {
fmt.Println("Init tail failed,err:", err)
return err
}
go run(Task)
}
go Update_Task()
return nil
}
func Update_Task() {
for {
select {
case new_tasks := <-tasks_chan:
for _, old_task := range tasks_map {
name := fmt.Sprintf("%s\\%s", old_task.Path, old_task.Topic)
tasks_map[name].CancelF()
}
for _, new_task := range new_tasks {
name := fmt.Sprintf("%s\\%s", new_task.Path, new_task.Topic)
Task, err := NewTask(*new_task)
if err != nil {
fmt.Println("init task err:", err)
return
}
tasks_map[name] = Task
}
}
}
}
func Get_chan() chan config.LogEntryConf {
return tasks_chan
}
func NewTask(base config.LogConf) (tal *config.TailTask, err error) {
cfg := tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2},
MustExist: false,
Poll: true,
}
ctx, cancel := context.WithCancel(context.Background())
tails, err = tail.TailFile(base.Path, cfg)
if err != nil {
fmt.Println("tail file failed,err:", err)
}
tal = &config.TailTask{
Path: base.Path,
Topic: base.Topic,
Instance: tails,
Ctx: ctx,
CancelF: cancel,
}
return
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
main.go
package main
import (
"fmt"
"sync"
"test/mylog/config"
"test/mylog/es"
"test/mylog/etcd"
"test/mylog/tail"
"test/mylog/kafka"
"gopkg.in/ini.v1"
)
var wg sync.WaitGroup
func main() {
var cfg config.AppConf
err := ini.MapTo(&cfg, "./config/config.ini")
if err != nil {
fmt.Println("Decode Map failed!", err)
}
err = kafka.Init([]string{cfg.KafkaConf.Address}, cfg.KafkaConf.Max_size)
if err != nil {
fmt.Println("init kafka failed", err)
return
}
fmt.Println("init kafka success!")
err = etcd.Init([]string{cfg.EtcdConf.Address}, cfg.EtcdConf.Timeout)
var path config.LogEntryConf
path, err = etcd.GetConf(cfg.Log_key)
if err != nil {
return
}
tail.Init(path)
es.Init(cfg.ESConf.Address, cfg.ESConf.Max_size)
for index, value := range path {
fmt.Printf("index:%v value:%v topic:%v\n", index, value, value.Topic)
kafka.Consumer(value.Topic)
}
wg.Add(1)
etcd.WatchConf(cfg.EtcdConf.Log_key)
wg.Done()
}